In [1]:
# export PYSPARK_PYTHON=/usr/bin/python3
In [2]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkContext
In [3]:
sc = SparkContext()
sqlContext = SQLContext(sc)
In [4]:
dataPath = '../Police_Department_Incident_Reports__Historical_2003_to_May_2018.csv'

crimeDataSchema = StructType([StructField("IncidntNum", LongType(), True),
                              StructField("Category", StringType(), True),
                              StructField("Descript", StringType(), True),
                              StructField("DayOfWeek", StringType(), True),
                              StructField("Date", StringType(), True),
                              StructField("Time", StringType(), True),
                              StructField("PdDistrict", StringType(), True),
                              StructField("Resolution", StringType(), True),
                              StructField("Address", StringType(), True),
                              StructField("X", DoubleType(), True),
                              StructField("Y", DoubleType(), True),
                              StructField("Location", StringType(), True),
                              StructField("PdId", LongType(), True)])

crimeDF = (sqlContext.read
           .format('csv')
           .option('delimiter', ',')
           .option('header', 'true')
           .load(dataPath, schema=crimeDataSchema)
           .cache())

# crimeDF.take(1)

Visualizations

1 Counts of Different Crimes

Let's first understand what types of crimes there are, and the frequencies of each.

In [5]:
crime_types = crimeDF.groupBy('Category').count()
category_rows = crime_types.select('Category', 'Count').orderBy('Count', ascending=True).collect()
In [6]:
category_counts = [(row.Category, row.Count) for row in category_rows]
# print(category_counts)
In [29]:
import matplotlib.pyplot as plt
import numpy as np

cats = [r[0] for r in category_counts]
vals = [r[1] for r in category_counts]

plt.figure(figsize=(10,15))
plt.barh(np.arange(len(cats)), vals, align='center')
plt.yticks(np.arange(len(cats)), cats)
plt.title('Crime Counts')
plt.xlabel('Counts')

for i, val in enumerate(vals):
    plt.text(val+1000, i-0.1, str(val))

plt.show()
In [8]:
crimes = (crimeDF.groupBy('Category')
          .count()
          .select('Category')
          .orderBy('Count', ascending=False)
          .collect())
crimes = [row.Category for row in crimes]
print("Crimes: ", crimes)

districts = (crimeDF.groupBy('PdDistrict')
             .count()
             .select('PdDistrict')
             .collect())
districts = [row.PdDistrict for row in districts if row.PdDistrict is not None]
print("Districts: ", districts)

category_district_count = (crimeDF.groupBy('Category', 'PdDistrict')
                           .count()
                           .select('Category', 'PdDistrict', 'Count')
                           .collect())
category_district_count = [(r.Category, r.PdDistrict, r.Count) for r in category_district_count]
# print(category_district_count)
Crimes:  ['LARCENY/THEFT', 'OTHER OFFENSES', 'NON-CRIMINAL', 'ASSAULT', 'VEHICLE THEFT', 'DRUG/NARCOTIC', 'VANDALISM', 'WARRANTS', 'BURGLARY', 'SUSPICIOUS OCC', 'MISSING PERSON', 'ROBBERY', 'FRAUD', 'SECONDARY CODES', 'FORGERY/COUNTERFEITING', 'WEAPON LAWS', 'TRESPASS', 'PROSTITUTION', 'STOLEN PROPERTY', 'SEX OFFENSES, FORCIBLE', 'DISORDERLY CONDUCT', 'DRUNKENNESS', 'RECOVERED VEHICLE', 'DRIVING UNDER THE INFLUENCE', 'KIDNAPPING', 'RUNAWAY', 'LIQUOR LAWS', 'ARSON', 'EMBEZZLEMENT', 'LOITERING', 'SUICIDE', 'FAMILY OFFENSES', 'BAD CHECKS', 'BRIBERY', 'EXTORTION', 'SEX OFFENSES, NON FORCIBLE', 'GAMBLING', 'PORNOGRAPHY/OBSCENE MAT', 'TREA']
Districts:  ['MISSION', 'BAYVIEW', 'CENTRAL', 'TARAVAL', 'TENDERLOIN', 'INGLESIDE', 'PARK', 'SOUTHERN', 'RICHMOND', 'NORTHERN']

NB: "TREA" is defined as "Trespassing or loitering near posted industrial property"

In [9]:
# Create mapping from text to num for 2d heatmap
crime_index = {crime: index for (index, crime) in enumerate(crimes)}
# print(crime_index)

district_index = {district: index for (index, district) in enumerate(districts)}
# print(district_index)
In [10]:
heatmap_grid = np.zeros([len(crimes), len(districts)])

for (crime, dist, count) in category_district_count:
    if not dist is None:
        heatmap_grid[crime_index[crime]][district_index[dist]] = count    
    
# print(heatmap_grid)    
In [11]:
import matplotlib.patheffects as PathEffects

fig, ax = plt.subplots(figsize=(30, 50))
im = ax.imshow(heatmap_grid, cmap='hot')

ax.set_xticks(np.arange(len(districts)))
ax.set_yticks(np.arange(len(crimes)))

ax.set_xticklabels(districts)
ax.set_yticklabels(crimes)
ax.xaxis.tick_top()

plt.setp(ax.get_xticklabels(), ha="center")

for i in range(len(crimes)):
    for j in range(len(districts)):
        text = ax.text(j, i, heatmap_grid[i, j],
                       ha="center", va="center", color="w")
        text.set_path_effects([PathEffects.withStroke(linewidth=5, foreground='black')])

ax.set_title("Crimes comitted per district")
fig.tight_layout()
plt.show()

2 How Crimes Fluctuate over Time

In [12]:
# First cache a DF with an actual date object
from pyspark.sql.functions import udf
import datetime

def parseDate(dateStr):
    tokens = dateStr.split("/")
    month = int(tokens[0])
    date = int(tokens[1])
    year = int(tokens[2])
    return datetime.date(year, month, date)

date_udf = udf(parseDate, DateType())

crime_with_date = (crimeDF.withColumn("Date_tmp", date_udf(crimeDF.Date))
         .drop("Date")
         .withColumnRenamed("Date_tmp", "Date")
         .select("Category", "Date", "PdDistrict")
         .cache())
crime_with_date.printSchema()
crime_with_date.take(5)
root
 |-- Category: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- PdDistrict: string (nullable = true)

Out[12]:
[Row(Category='NON-CRIMINAL', Date=datetime.date(2015, 1, 19), PdDistrict='MISSION'),
 Row(Category='ROBBERY', Date=datetime.date(2015, 2, 1), PdDistrict='TENDERLOIN'),
 Row(Category='ASSAULT', Date=datetime.date(2015, 2, 1), PdDistrict='TENDERLOIN'),
 Row(Category='SECONDARY CODES', Date=datetime.date(2015, 2, 1), PdDistrict='TENDERLOIN'),
 Row(Category='VANDALISM', Date=datetime.date(2015, 1, 27), PdDistrict='NORTHERN')]

This section will analyze the rise and fall in popularity of various crimes over 15 years.

In [13]:
extract_year_udf = udf(lambda datetime: datetime.year, LongType())

crime_with_year = (crime_with_date
                   .withColumn("Year", extract_year_udf(crime_with_date.Date))
                   .groupBy("Year", "Category", "PdDistrict")
                   .count()
                   .select("Year", "Category", "PdDistrict", "Count")
                   .orderBy("Year", ascending=True)
                   .cache())
crime_with_year.take(10)
# print(len(crime_with_year.collect()))
Out[13]:
[Row(Year=2003, Category='NON-CRIMINAL', PdDistrict='PARK', Count=660),
 Row(Year=2003, Category='KIDNAPPING', PdDistrict='TARAVAL', Count=26),
 Row(Year=2003, Category='VEHICLE THEFT', PdDistrict='SOUTHERN', Count=1426),
 Row(Year=2003, Category='DRUG/NARCOTIC', PdDistrict='PARK', Count=383),
 Row(Year=2003, Category='TRESPASS', PdDistrict='TENDERLOIN', Count=136),
 Row(Year=2003, Category='KIDNAPPING', PdDistrict='TENDERLOIN', Count=38),
 Row(Year=2003, Category='BAD CHECKS', PdDistrict='SOUTHERN', Count=34),
 Row(Year=2003, Category='DRIVING UNDER THE INFLUENCE', PdDistrict='RICHMOND', Count=30),
 Row(Year=2003, Category='EMBEZZLEMENT', PdDistrict='NORTHERN', Count=30),
 Row(Year=2003, Category='BAD CHECKS', PdDistrict='TENDERLOIN', Count=6)]
In [14]:
from matplotlib.ticker import MaxNLocator
x = range(2003, 2019) #16 years of data, [2003, 2019)

fig = plt.figure(figsize=(15, 100))
for i, crime in enumerate(crimes):
    y = []
    
    for district in districts:
        district_crime_counts = [0]*(2019-2003)

        district_crime_count_by_year = (crime_with_year
                                        .filter(crime_with_year.PdDistrict == district)
                                        .filter(crime_with_year.Category == crime)
                                        .collect())
        for year_row in district_crime_count_by_year:
            district_crime_counts[year_row.Year-2003] = year_row.Count

        y.append(district_crime_counts)
        
    fig.add_subplot(20,2,i+1)
    plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.stackplot(x, y, labels=districts)
    plt.legend(loc='upper left')
    plt.title(crime)
    plt.ylabel("Counts")
plt.show()
# print(y)

NB: Data for 2018 is not complete, hence the downward trend for all crimes in 2018

This section will analyze how the month of the year impacts the frequency of different criminal activities.

In [15]:
extract_month_udf = udf(lambda datetime: datetime.month, LongType())

crime_with_month_year = (crime_with_date
                   .withColumn("Year", extract_year_udf(crime_with_date.Date))
                   .withColumn("Month", extract_month_udf(crime_with_date.Date))
                   .groupBy("Year", "Month", "Category")
                   .count()
                   .select("Year", "Month", "Category", "Count")
                   .orderBy("Month", ascending=True)
                   .cache())
crime_with_month_year.take(10)
# print(len(crime_with_year.collect()))
Out[15]:
[Row(Year=2012, Month=1, Category='EXTORTION', Count=7),
 Row(Year=2005, Month=1, Category='FORGERY/COUNTERFEITING', Count=222),
 Row(Year=2011, Month=1, Category='SEX OFFENSES, NON FORCIBLE', Count=2),
 Row(Year=2017, Month=1, Category='FORGERY/COUNTERFEITING', Count=44),
 Row(Year=2010, Month=1, Category='DRUG/NARCOTIC', Count=889),
 Row(Year=2013, Month=1, Category='TRESPASS', Count=84),
 Row(Year=2012, Month=1, Category='ARSON', Count=29),
 Row(Year=2016, Month=1, Category='ROBBERY', Count=266),
 Row(Year=2003, Month=1, Category='LARCENY/THEFT', Count=2121),
 Row(Year=2012, Month=1, Category='ROBBERY', Count=312)]
In [16]:
year_month_stats = (crime_with_month_year.rdd
 .map(lambda x: ((x.Year, x.Month, x.Category), x.Count))
 .reduceByKey(lambda a, b: a[1]+b[1]))
year_month_stats = year_month_stats.collect()
# print(year_month_stats)
In [17]:
#  <---2003 ----- 2018--->
#  ^
#  | jan
#  | ...
#  | dec
#  v

# z = [
#     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
#     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
#     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
#     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
#     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
#     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
#     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
#     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
#     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
#     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
#     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
#     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]
In [18]:
crime_radial_plot = {crime:np.zeros([12, 16]) for crime in crimes}

for ((year, month, crime), count) in year_month_stats:
    crime_radial_plot[crime][month-1][year-2003] += count
In [19]:
for crime in crime_radial_plot:
    theta, r = np.mgrid[0:2*np.pi:13j, 0:1:17j]
    z = crime_radial_plot[crime]
    
    fig, ax = plt.subplots(figsize=(7, 7), subplot_kw=dict(projection='polar'))
    im = ax.pcolormesh(theta, r, z, cmap='YlGn')
    fig.colorbar(im, ax=ax)

    ax.set_title(crime)
    ax.set_theta_offset(np.pi/2-np.pi/12)
    ax.set_theta_direction(-1)

    ax.set_yticks(np.arange(0, 1, 1/17))
    ax.set_yticklabels([str(y) for y in range(2003, 2019)])
    ax.set_xticklabels(['', 'Feb', '', 'May', '', 'Aug', '', 'Nov'])
    ax.set_rlabel_position(-4.5*np.pi)
/Users/hongseo/env/lib/python3.7/site-packages/matplotlib/pyplot.py:514: RuntimeWarning: More than 20 figures have been opened. Figures created through the pyplot interface (`matplotlib.pyplot.figure`) are retained until explicitly closed and may consume too much memory. (To control this warning, see the rcParam `figure.max_open_warning`).
  max_open_warning, RuntimeWarning)

Each slice of the pie represents a month. Each ring represents a year. Intensity of color represents frequency of the occurrence of the crime.

In [ ]:
 
In [26]:
day_idx = {"Monday":0,
           "Tuesday":1,
           "Wednesday":2,
           "Thursday":3,
           "Friday":4,
           "Saturday":5,
           "Sunday":6
          }

crime_with_day = (crimeDF
                .groupBy("DayOfWeek", "Category", "PdDistrict")
                .count()
                .select("DayOfWeek", "Category", "PdDistrict", "Count")
                .orderBy("DayOfWeek", ascending=True)
                .cache())

x = range(0, 7)

fig = plt.figure(figsize=(15, 100))
for i, crime in enumerate(crimes):
    y = []
    
    for district in districts:
        district_crime_counts = [0]*(7-0)

        district_crime_count_by_day = (crime_with_day
                                        .filter(crime_with_day.PdDistrict == district)
                                        .filter(crime_with_day.Category == crime)
                                        .collect())
        for day_row in district_crime_count_by_day:
            district_crime_counts[day_idx[day_row.DayOfWeek]] = day_row.Count

        y.append(district_crime_counts)
        
    fig.add_subplot(20,2,i+1)
    plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.xticks(np.arange(7), day_idx)
    plt.stackplot(x, y, labels=districts)
    plt.legend(loc='upper left')
    plt.title(crime)
    plt.ylabel("Counts")
    plt.xlabel("day")
plt.show()

            
In [20]:
def parseTime(timeStr):
    tokens = timeStr.split(":")
    hour = int(tokens[0])
    minute = int(tokens[1])
    return hour

timeDF = (crimeDF.withColumn("Time_tmp", udf(parseTime, IntegerType())(crimeDF.Time))
           .drop("Time")
           .withColumnRenamed("Time_tmp", "Time")
           .select("Category", "Time", "PdDistrict")
           .cache())

crime_with_hour = (timeDF
                   .groupBy("Time", "Category", "PdDistrict")
                   .count()
                   .select("Time", "Category", "PdDistrict", "Count")
                   .orderBy("Time", ascending=True)
                   .cache())
In [21]:
x = range(0, 24) #16 years of data, [2003, 2019)

fig = plt.figure(figsize=(15, 100))
for i, crime in enumerate(crimes):
    y = []
    
    for district in districts:
        district_crime_counts = [0]*(24-0)

        district_crime_count_by_hour = (crime_with_hour
                                        .filter(crime_with_hour.PdDistrict == district)
                                        .filter(crime_with_hour.Category == crime)
                                        .collect())
        #print(district_crime_count_by_hour)
        for hour_row in district_crime_count_by_hour:
            district_crime_counts[hour_row.Time] = hour_row.Count

        y.append(district_crime_counts)
        
    fig.add_subplot(20,2,i+1)
    plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.stackplot(x, y, labels=districts)
    plt.legend(loc='upper left')
    plt.title(crime)
    plt.ylabel("Counts")
    plt.xlabel("Time since 00:00 AM")
plt.show()
In [ ]: